feat: response body mode framework for streaming support#169
Conversation
|
Your PR is large. Please consider breaking it into multiple PRs. The |
|
Your PR is large. Please consider breaking it into multiple PRs. The |
Add ChunkProcessor interface that plugins declaring BodyChunked must implement. The framework validates this at startup and pre-computes the list of ChunkProcessors per profile. When streaming (no buffering), each response body chunk is passed through all ChunkProcessors before being acked to Envoy, enabling in-flight chunk transformation. Depends on llm-d#169 Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
bf93834 to
381fe43
Compare
|
Your PR is large. Please consider breaking it into multiple PRs. The |
…cessing Split response processing into two interfaces per review feedback: - ResponseProcessor: processes the complete buffered response body (existing) - ResponseChunkProcessor: processes individual chunks without buffering (new) The Profile now carries both processor lists and a NeedsResponseBuffering flag. The config loader sorts response plugins into the right list based on which interface they implement. A plugin can implement both. When ResponseProcessor plugins are present → buffer the full response. When only ResponseChunkProcessors are present → stream chunks through. When neither is present → buffer (backward compatible). Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
381fe43 to
5f2053c
Compare
…alidate no mixing - Simplify server.go buffering check to Profile.NeedsResponseBuffering (comment 1) - Remove matched variable, use continue pattern in config loader (comment 2) - Move chunk processing from server.go to HandleResponseChunk in response.go (comment 3) - Add validation: reject profiles that mix ResponseProcessor and ResponseChunkProcessor plugins (comment 4) - HandleResponseChunk runs chunk processors with metrics and logging, following the same pattern as HandleResponseBody - Update streaming test to set NeedsResponseBuffering for buffered path Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
| wantFullBody := []byte(`{"choices":[{"text":"Hello!"}]}`) | ||
|
|
||
| profiles := newTestProfiles() | ||
| profiles[testProfileName].NeedsResponseBuffering = true |
There was a problem hiding this comment.
can we add in a follow up unit tests to cover the new funcionality?
- mix of body response plugins + chunk response fails.
- setting profile with chunks only perform streaming.
- setting profile with full only is buffering.
- setting profile with no plugins performs streaming.
not a blocker for this PR (can be pushed in follow up).
There was a problem hiding this comment.
Agreed — will add in a follow-up PR.
| metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp)) | ||
| responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody) | ||
| loggerVerbose.Info("processing response body complete") | ||
| } else { |
There was a problem hiding this comment.
now this looks nice and clean :)
- Move TTFT metric recording after if/else so it applies to both buffered and streaming paths (comment 3+4) - Change ResponseChunkProcessor interface: takes string (framework converts once) + InferenceResponse (for header mutation) (comment 5) - Add nil profile check and empty processor check in HandleResponseChunk for bodiless requests like GET /v1/models (comment 6) - Extract runResponseChunkProcessors and buildStreamedChunkResponse following the same patterns as the body processing path Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
In FULL_DUPLEX_STREAMED mode, envoy needs the deferred HeadersResponse before it forwards response body chunks to the client. Previously, buildStreamedChunkResponse only sent HeadersResponse on endOfStream, which caused empty response bodies when EoS was delayed or missing. Track ResponseHeadersSent on RequestContext and send HeadersResponse with the first chunk instead. Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
|
|
||
| // HandleResponseChunk runs ResponseChunkProcessors on a single response body chunk | ||
| // and wraps the result in the ext_proc streaming response format. | ||
| func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunkBytes []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) { |
There was a problem hiding this comment.
something is still wrong in this function..
chunk processor plugins cannot mutate the chunk content, and even if they do it's ignored in L151.
also, I've noticed that BodyMutation is always used even if chunk hasn't changed, which is an expensive operation in envoy..
we should also probably distinguish between the case when chunk was mutated and the case when it was passed through.
we can improve that last point on a follow up PR, leaving mutation always works for now.
but the first point is functionality wise wrong.
- Add CurrentChunk/SetChunk/ChunkMutated to InferenceResponse so chunk processors can read and mutate chunk content through the response object rather than a pass-by-value string parameter. - HandleResponseChunk sets ResetChunkState before plugins run and uses the (potentially mutated) chunk in buildStreamedChunkResponse. - Move TTFT metric recording after the if/else block so it fires for both buffered and streaming response paths. Change continue to break in the buffering accumulation branch so the metric code is reachable. Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
- runResponseChunkProcessors passes response.CurrentChunk to each plugin so mutations from earlier plugins are visible to later ones. - Add unit tests for SetChunk, ChunkMutated, ResetChunkState. Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
|
Addressed the latest comments in two commits: 63991fd — chunk mutation via InferenceResponse:
63991fd — TTFT metric fix:
d793ccb — tests:
Also fixed |
Summary
BodyProcessingModetype (Skip,Chunks,Full) andResponseBodyRequirementinterface for response plugins to declare their body access needsNeedsResponseBufferingper profile at startup based on plugin declarationsNeedsResponseBuffering=falsefor the selected profile, ack each response body chunk immediately — enabling real-time client streamingFull(backward compatible)Body Processing Modes
SkipChunksChunkProcessor)FullHow it works
At startup, the config loader iterates each profile's response plugins and checks their
BodyProcessingMode()declaration:Full→NeedsResponseBuffering=truefor that profileNeedsResponseBuffering=false, chunks are acked immediatelyThis is a per-profile decision, so different profiles can have different buffering behavior.
Test plan
computeResponseBufferingcovering all mode combinationsChunkswithoutChunkProcessorimplementation fails at startup